Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use models literal StructuredDataset to enable sd bypass task #2954

Merged
merged 7 commits into from
Dec 2, 2024

Conversation

JiangJiaWei1103
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 commented Nov 25, 2024

Tracking issue

Closes flyteorg/flyte#5956.

Why are the changes needed?

For a flyte task with python StructuredDataset as an input from dataclass attribute access (i.e., dc.sd), it fails to convert the task output LiteralMap back to flyte idl. On the highest level, the process of calling _dispatch_execute (here) is illustrated as follows:

pythontask_sd_io

As can be observed, the problem (marked as orange) occurs when the input LiteralMap is converted to python natives, during which a Literal is built with python native StructuredDataset, instead of using the correct literals.StructuredDataset (please refer to this code snippet).

This makes conversion from the task output LiteralMap back to flyte idl fail because python native StructuredDataset has no to_flyte_idl method.

What changes were proposed in this pull request?

Directly construct a literals.StructuredDataset within dict_to_structured_dataset method of StructuredDatasetTransformerEngine.

How was this patch tested?

  1. Execute the command run in the K8s pod, as suggested by @pingsutw:
    • Describe the pod executing the remote task in the reported issue.
    • Copy the command in the pod and reproduce locally.

After fixing, outputs.pb now can be successfully written to s3 storage.
Screenshot 2024-11-26 at 9 52 57 PM

Screenshot 2024-11-26 at 9 54 19 PM

  1. Do remote sandbox test with flytekit installed by commit sha in this PR.
from dataclasses import dataclass, field

from flytekit import task, workflow, ImageSpec
from flytekit.types.structured import StructuredDataset


# Build docker image and push to the local registry
flytekit_hash = "d5d8620cc0494a148adb6bb4a13978d810d9673a"  # Refer to this PR
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
image = ImageSpec(
    packages=[
        flytekit,
        "pandas",
        "pyarrow"
    ],
    apt_packages=["git"],
    registry="localhost:30000",
)


@dataclass
class DC:
    sd: StructuredDataset = field(
        default_factory=lambda: StructuredDataset(
            uri="s3://my-s3-bucket/s3_flyte_dir/df.parquet", 
            file_format="parquet"
        )
    )


@task(container_image=image)
def t_sd_attr(sd: StructuredDataset) -> StructuredDataset:
    return sd


@workflow
def wf(dc: DC):
    t_sd_attr(sd=dc.sd)

Run the following command:

pyflyte run --remote reprod.py wf --dc '{"dc": {"sd": {"uri": "s3://my-s3-bucket/s3_flyte_dir/df.parquet", "file_format": "parquet"}}}'

The remote sandbox test result is shown as follows:

Screenshot 2024-11-26 at 10 14 03 PM

Setup process

git clone https://github.com/flyteorg/flytekit.git
gh pr checkout 2954
make setup && pip install -e .

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

#2914

Docs link

@JiangJiaWei1103
Copy link
Contributor Author

JiangJiaWei1103 commented Nov 26, 2024

Follow-up

Because the reported bug is raised in the remote run (which involves communication with s3), I think it's hard to test in both unit test and integration test, as we've discussed before here. If there's any possibility to better test this fix, I'll handle it!

@@ -197,12 +197,25 @@ def return_sd() -> StructuredDataset:
return df

For details, please refer to this issue: https://github.com/flyteorg/flyte/issues/5954.

2. Need access to self._literal_sd when converting task output back to flyteidl, please see:
https://github.com/flyteorg/flytekit/blob/master/flytekit/bin/entrypoint.py#L326
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use github permanent link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me fix it. Thanks!

@JiangJiaWei1103
Copy link
Contributor Author

JiangJiaWei1103 commented Nov 28, 2024

Follow-up

The last commit aims at simplifying the building logic of sd_literal. Instead of constructing a dummy python native StructuredDataset then calling set_literal, we directly build sd_literal here. Thanks for @Future-Outlier 's suggestion.

The next steps are:

  1. Make sure this patch passes the sandbox test using the image built with the last commit sha.
  2. Handle the empty-string file_format issue.

@Future-Outlier
Copy link
Member

Follow-up

The last commit aims at simplifying the building logic of sd_literal. Instead of constructing a dummy python native StructuredDataset then calling set_literal, we directly build sd_literal here. Thanks for @Future-Outlier 's suggestion.

The next steps are:

  1. Make sure this patch passes the sandbox test using the image built with the last commit sha.
  2. Handle the empty-string file_format issue.
  1. you can use single binary to test it, this will be quicker.
  2. this will help a lot, thank you man

Copy link

codecov bot commented Nov 28, 2024

Codecov Report

Attention: Patch coverage is 11.11111% with 8 lines in your changes missing coverage. Please review.

Project coverage is 78.28%. Comparing base (f938661) to head (7ce7ea8).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/types/structured/structured_dataset.py 11.11% 8 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2954      +/-   ##
==========================================
+ Coverage   75.14%   78.28%   +3.13%     
==========================================
  Files         200      200              
  Lines       20919    20928       +9     
  Branches     2692     2693       +1     
==========================================
+ Hits        15720    16383     +663     
+ Misses       4445     3745     -700     
- Partials      754      800      +46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@JiangJiaWei1103
Copy link
Contributor Author

Hi @Future-Outlier,

The last commit passes the test example you provided me yesterday, as shown below:

Screenshot 2024-11-29 at 11 08 33 PM

For the first question, I do use a single binary to run remote tests as suggested in the official setup. In step 2, flytectl demo start --dev is used to start the k3s cluster and this environment is referred to as Flyte sandbox, so I just call it a sandbox test. Sorry for the dumb question here, but I would like to further clarify the difference and refine my understanding. Thanks very much!

For the empty-string file_format, I suspect this information is erased before inputs.pb is loaded from s3. Following shows the loaded input proto:
Screenshot 2024-11-29 at 11 27 50 PM

As can be observed, the information of file_format is absent. Hence, I think I'll continue tracing the serialization and registration process to find out what's going on.

@JiangJiaWei1103
Copy link
Contributor Author

JiangJiaWei1103 commented Nov 30, 2024

Further Discussion - Logic behind extract_cols_and_format

For the last commit, we drop the redundant handling of column information of StructuredDataset in dict_to_structured_dataset. First of all, extract_cols_and_format isn't exclusive to deprecated FlyteSchema. It's used to iterate through Annotated and extract the information like base type, column information, etc. Following demonstrates a simple example about extracting one column out of a pd.DataFrame and returning one-column df:

all_cols = kwtypes(name=str, height=int)
col = kwtypes(height=int)

@task(container_image=image)
def t1(sd: Annotated[StructuredDataset, all_cols]) -> Annotated[StructuredDataset, col]:
    df = sd.open(pd.DataFrame).all()

    return StructuredDataset(dataframe=df)

For these cases, async_to_python_value can already handle column information, as can be seen here and here. Hence, we just drop the redundant handling.

Test Result

Due to the limitation on unit tests and integration tests now, we only test this patch by single binary.

Screenshot 2024-11-30 at 4 57 32 PM

Empty-String file_format

I'll open another issue containing complete experiments and observations this week. And, I'll continue solving that problem.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really Nice PR.
I can tell you put lots of efforts.
Let's make Flyte GREAT!

@Future-Outlier Future-Outlier enabled auto-merge (squash) December 2, 2024 03:21
@Future-Outlier Future-Outlier merged commit 0b0e338 into flyteorg:master Dec 2, 2024
29 checks passed
@JiangJiaWei1103
Copy link
Contributor Author

Thanks for your support and patience, bro!

Let's move on and make Flyte better!!

arbaobao pushed a commit to arbaobao/flytekit that referenced this pull request Dec 4, 2024
…lyteorg#2954)

* fix: Use models literal sd to enable sd bypass task

Signed-off-by: JiaWei Jiang <[email protected]>

* Reuse sd _set_literal with a public wrapper method

Signed-off-by: JiaWei Jiang <[email protected]>

* Use github permanent link

Signed-off-by: JiaWei Jiang <[email protected]>

* Simplify sd_literal building logic

Signed-off-by: JiaWei Jiang <[email protected]>

* Drop unnecessary column handling

Signed-off-by: JiaWei Jiang <[email protected]>

---------

Signed-off-by: JiaWei Jiang <[email protected]>
Co-authored-by: Future-Outlier <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[BUG] Attribute Access Structured Dataset from Dataclass and return it will fail
2 participants